// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.predicate.index;

import com.yahoo.document.predicate.PredicateHash;

/**
 * Expands range terms from a query to find the set of features they translate to.
 *
 * @author bjorncs
 * @author <a href="mailto:magnarn@yahoo-inc.com">Magnar Nedland</a>
 */
public class PredicateRangeTermExpander {
    private final int arity;
    private final int maxPositiveLevels;
    private final int maxNegativeLevels;
    private final long lowerBound;
    private final long upperBound;

    /**
     * Creates a PredicateRangeTermExpander with default value range.
     *
     * @param arity The arity used to index the predicates
     */
    public PredicateRangeTermExpander(int arity) {
        this(arity, Long.MIN_VALUE, Long.MAX_VALUE);
    }

    /**
     * @param arity      The arity used to index the predicates
     * @param lowerBound The minimum value used by any range predicate in the system
     * @param upperBound The maximum value used by any range predicate in the system
     */
    public PredicateRangeTermExpander(int arity, long lowerBound, long upperBound) {
        this.arity = arity;
        this.lowerBound = lowerBound;
        this.upperBound = upperBound;
        this.maxPositiveLevels = calculateMaxLevels(upperBound);
        this.maxNegativeLevels = calculateMaxLevels(-lowerBound);
    }

    private int calculateMaxLevels(long t) {
        int maxLevels = 1;
        while ((t /= this.arity) != 0) {
            maxLevels++;
        }
        return maxLevels;
    }

    /**
     * Expands a range term to a set of features (ranges and edges) to be used in a query.
     *
     * @param key          The term key
     * @param value        The term value
     * @param rangeHandler Handler for range features (long)
     * @param edgeHandler  Handler for edge features (long, int)
     */
    public void expand(String key, long value, RangeHandler rangeHandler, EdgeHandler edgeHandler) {
        if (value < lowerBound || value > upperBound) {
            // Value outside bounds -> expand to nothing.
            return;
        }
        int maxLevels = value > 0 ? maxPositiveLevels : maxNegativeLevels;
        int sign = value > 0 ? 1 : -1;
        // Append key to feature string builder
        StringBuilder builder = new StringBuilder(128);
        builder.append(key).append('=');

        long levelSize = arity;
        long edgeInterval = (value / arity) * arity;
        edgeHandler.handleEdge(createEdgeFeatureHash(builder, edgeInterval), (int) Math.abs(value - edgeInterval));
        for (int i = 0; i < maxLevels; ++i) {
            long start = (value / levelSize) * levelSize;
            if (Math.abs(start) + levelSize - 1 < 0) {  // overflow
                break;
            }
            rangeHandler.handleRange(createRangeFeatureHash(builder, start, start + sign * (levelSize - 1)));
            levelSize *= arity;
            if (levelSize <= 0 && levelSize != Long.MIN_VALUE) {  //overflow
                break;
            }
        }
    }

    private long createRangeFeatureHash(StringBuilder builder, long start, long end) {
        int prefixLength = builder.length();
        String feature = end > 0
                ? builder.append(start).append('-').append(end).toString()
                : builder.append(end).append('-').append(Math.abs(start)).toString();

        builder.setLength(prefixLength);
        return PredicateHash.hash64(feature);
    }

    private long createEdgeFeatureHash(StringBuilder builder, long edgeInterval) {
        int prefixLength = builder.length();
        String feature = builder.append(edgeInterval).toString();
        builder.setLength(prefixLength);
        return PredicateHash.hash64(feature);
    }

    /**
     * Callback for ranges generated by the expansion.
     */
    @FunctionalInterface
    public interface RangeHandler {
        void handleRange(long featureHash);
    }

    /**
     * Callback for edges generated by the expansion.
     */
    @FunctionalInterface
    public interface EdgeHandler {
        void handleEdge(long featureHash, int value);
    }
}
